Skip to content

[Delta] Add Changelog support#6794

Open
SanJSp wants to merge 14 commits into
delta-io:masterfrom
SanJSp:changelog-pr
Open

[Delta] Add Changelog support#6794
SanJSp wants to merge 14 commits into
delta-io:masterfrom
SanJSp:changelog-pr

Conversation

@SanJSp
Copy link
Copy Markdown
Collaborator

@SanJSp SanJSp commented May 15, 2026

Please read my added comments in the conversation, they explain differences to the path I'd have expected to cleanly work.

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (Build)

Description

Guarded behing a feature flag, this PR Implements B.2 of the CDC SPIP, so the Changelog interface. Furthermore, it adds catalog-driven Auto-CDF (TableCatalog.loadChangelog) for the DSv2 connector, so the kernel-based V2 reader stack answers SELECT * FROM t CHANGES FROM VERSION/TIMESTAMP ... batch queries introduced by SPARK-55668 (apache/spark#55508).
The streaming CDC entrypoint shipped in #6359 is preserved unchanged and coexists with the new batch path through an explicit CdcReadMode flag.

What the PR adds

  • DeltaCatalogChangelogSupport (new, in spark-unified) — abstract Scala class between AbstractDeltaCatalog (sparkV1) and the hybrid DeltaCatalog (spark-unified). Overrides loadChangelog(ident, changelogInfo) and dispatches on the ChangelogRange subtype (version / timestamp / unbounded). Resolves the table through loadTable (so it works for both SparkTable and DeltaTableV2), loads the latest Kernel snapshot via SnapshotManagerFactory, validates row tracking is enabled, applies bounds-inclusivity adjustments, and returns a DeltaChangelog.

The class lives in spark-unified because the implementation references sparkV2 classes (SparkTable, DeltaChangelog, SnapshotManagerFactory, V2SchemaUtils) and sparkV1 cannot depend on sparkV2. DeltaCatalog.java now extends DeltaCatalogChangelogSupport.

  • CdcReadMode (new enum, sparkV2) — replaces the prior boolean isCDCRead on PartitionUtils.createDeltaParquetReaderFactory:

    • NONE — non-CDC scan (SparkBatch).
    • STREAMING — opt-in streaming CDC (SparkMicroBatchStream when readChangeFeed = true). PartitionUtils owns the CDC schema augmentation and CDCReadFunction wrap, as before.
    • BATCH_CHANGELOG — Auto-CDF (DeltaChangelogBatch). PartitionUtils leaves schema and reader untouched; CDCPartitionReaderFactory in DeltaChangelogBatch injects _change_type / _commit_version / _commit_timestamp as per-partition constants instead. This avoids the double-injection / schema-vs-reader misalignment that the previous shared isCDCRead=true path caused.
  • Feature flag DELTA_CHANGELOG_V2_ENABLED (changelogV2.enabled, internal, default false). When disabled, DeltaCatalogChangelogSupport.loadChangelog delegates to super.loadChangelog, which surfaces the familiar UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE error. Lets the implementation land without changing user-visible behavior until tests catch up.

  • Per-commit ordering in DeltaChangelogBatch.planInputPartitions — emits all RemoveFile partitions before AddFile partitions within a single commit, so Spark's batch CDC post-processor (ResolveChangelogTable) sees preimage → postimage pairs regardless of the action order in the on-disk commit log. The Delta protocol does not contract action order; pinning the partition order here makes the test expectations stable across protocol implementations.

Why this is split this way

The DSv2 Changelog interface (apache/spark#55426) covers both batch and streaming, but the two enter Delta through different paths today:

The follow-up DeltaChangelogScan.toMicroBatchStream() (so the catalog-driven Changelog also drives streaming reads, completing the surface that apache/spark#55637 builds on for deduplicationMode = netChanges) is left as a TODO in DeltaChangelogScan for a follow-up PR.

Limitations / known follow-ups

  • Row tracking must be enabled on the source table (Auto-CDF surfaces a requires row tracking analysis error otherwise).
  • UnboundedRange is rejected with DELTA_CHANGELOG_UNBOUNDED_RANGE; Auto-CDF always operates over a bounded range.
  • DeltaChangelogScan does not yet implement toMicroBatchStream() — see inline TODO.
  • DeltaChangelogBatch.planInputPartitions still goes through StreamingHelper.getCommitActionsFromRangeUnsafe (marked TODO in the file). The helper is generic — only the class name is streaming-flavored. A separate rename / extract pass would be good.

Build dependency

The new code uses the Changelog / ChangelogInfo / ChangelogRange interfaces added in Spark 4.2 (apache/spark#55426). This PR is tested by cherry picking changes from #6657, which re-enables the Spark 4.2 snapshot cross-build row.

How was this patch tested?

New tests (17 total, all green):

  • DeltaChangelogDirectBatchExecutionTest — exercises the DeltaChangelogScanBatchPartitionReader path directly, without going through SQL. Covers: initial insert + delete with paired DELETE/INSERT output, UPDATE-as-CoW producing paired preimage/postimage rows at the same commit, range slicing on non-zero start, and rowId / rowVersion field-reference contract.

  • DeltaChangelogCatalogIntegrationTest — exercises the catalog-routed entrypoint (TableCatalog.loadChangelog) over both SQL CHANGES FROM and the DataFrame API. Covers all four ChangelogRange shapes (version range, timestamp range, open-ended, exclusive bounds), boundary inclusivity, and the failure modes (timestamp before earliest commit, timestamp after latest commit, empty exclusive range, unbounded rejection).

Pre-existing suites still pass:

Build / test command used locally:

build/sbt "sparkV2/testOnly io.delta.spark.internal.v2.read.changelog.*"

Does this PR introduce any user-facing changes?

No, not while the feature flag is at its default.

With changelogV2.enabled = true (internal flag, default false): SELECT * FROM <table> CHANGES FROM VERSION/TIMESTAMP ... and the DataFrame .changes(...) builder become functional on row-tracking-enabled Delta tables via the V2 connector. Previously the V2 catalog rejected these queries with UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE.

Copy link
Copy Markdown
Collaborator Author

@SanJSp SanJSp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes on a few non-obvious decisions in this PR — flagging them here so reviewers don't have to reconstruct the reasoning. Quotes are text from LLMs.

Configuration hadoopConf,
SQLConf sqlConf,
boolean isCDCRead) {
CdcReadMode cdcReadMode) {
Copy link
Copy Markdown
Collaborator Author

@SanJSp SanJSp May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling different cdcReadModes

Problem. Before this PR createDeltaParquetReaderFactory took boolean isCDCRead. Two callers passed true for completely different semantics: SparkMicroBatchStream for streaming CDC (PartitionUtils owns schema augmentation + CDCReadFunction wrap), and DeltaChangelogBatch for Auto-CDF (outer CDCPartitionReaderFactory owns CDC tail injection). The method had to disambiguate by inspecting readDataSchema for a _metadata field — coupling a CDC decision to a row-tracking schema check.

Options.

  1. Keep isCDCRead boolean, continue the _metadata-presence heuristic — fragile; breaks if row tracking ever appears without Auto-CDF.
  2. Split into two factory methods — duplicates the shared DV / row-tracking / format wiring.
  3. Single factory with CdcReadMode { NONE, STREAMING, BATCH_CHANGELOG }.

Chosen. Option 3. Dispatch contract is explicit at the call site, shared wiring stays in one place, and a future BATCH_CHANGELOG_NETCHANGES mode (post apache/spark#55637) drops in without changing call signatures.

Like in the comment below I'm a bit confused on how batch/streaming should live with each other. Advice is appreciated.

// CDCPartitionReaderFactory injects the CDC tail columns as constants instead.
Optional<CDCSchemaContext> cdcSchemaContext =
isCDCRead
cdcReadMode.injectsCdcAtReaderLevel()
Copy link
Copy Markdown
Collaborator Author

@SanJSp SanJSp May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming CDC and Auto-CDF coexistence

Problem. The streaming CDC entrypoint shipped in #6359 mutates readDataSchema with CDC tail columns (_change_type, _commit_version, _commit_timestamp) and wraps the reader with CDCReadFunction to inject those columns per row. Auto-CDF (DeltaChangelogBatch) wraps its delegate with CDCPartitionReaderFactory, which injects the same columns as per-partition constants (one value per CDC file change). If both fire, the schema sees the CDC tail twice and projection blows up with GenericInternalRow cannot be cast to Long.

Options.

  1. Tear down the OSS streaming-CDC path and route streaming through Auto-CDF — invasive, breaks streaming tests until toMicroBatchStream lands.
  2. Heuristic: skip the inner wrap when _metadata is in readDataSchema — works incidentally; same fragility called out above.
  3. Explicit CdcReadMode dispatch: injectsCdcAtReaderLevel() is true only for STREAMING.

Chosen. Option 3. Both pathways stay live; the connector picks the right wrapper from the mode at the call site. Streaming behavior is unchanged from #6359.

Tbh, I was a bit overwhelmed on how to handle the streaming co-existence with our batch approach. I'm not sure if the chosen path is the correct one.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on the comment on CdcReadMode: Consider Streaming write-time CDF a completely unrelated feature.
It's not possible to read both read-time CDF and write-time CDF in the same query, so it's not possible for the problem you describe to occur.

The important part here will be to make the distinction between the two features really clear to avoid confusion. This can simply be renaming isCDCRead to isWriteTimeCDCRead.

DeltaChangelogBatch will call createDeltaParquetReaderFactory passing isWriteTimeCDCRead=false

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Renamed; DeltaChangelogBatch passes isWriteTimeCDCRead=false. Javadoc updated to make the read-time vs write-time distinction explicit.

* classes (SparkTable, DeltaChangelog, SnapshotManagerFactory, V2SchemaUtils) which
* the sparkV1 module that hosts [[AbstractDeltaCatalog]] cannot depend on.
*/
abstract class DeltaCatalogChangelogSupport extends AbstractDeltaCatalog {
Copy link
Copy Markdown
Collaborator Author

@SanJSp SanJSp May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the loadChangelog override lives here, not in AbstractDeltaCatalog

**Problem.** The 1:1 port placed `loadChangelog` directly inside `AbstractDeltaCatalog`. That override references sparkV2 types (`SparkTable`, `DeltaChangelog`, `SnapshotManagerFactory`, `V2SchemaUtils`) — but `AbstractDeltaCatalog` lives in sparkV1, and sparkV2 already depends on sparkV1Filtered → sparkV1 transitively. A `sparkV1 → sparkV2` edge would close the dependency cycle and the module graph stops being a DAG.

Options.

  1. Move the sparkV2 changelog classes (DeltaChangelog, SparkTable, SnapshotManagerFactory, ...) into sparkV1 — pulls Kernel-based reader code into the V1 module against the Hybrid pattern introduced in [Spark] Hybrid Delta connector combining V1(default) and V2 connectors #5726. But I think we want to keep changes related to DsV2 to in V2, right?
  2. Reflection lookup from sparkV1 — avoids compile-time deps but hides API contracts and is invisible to IDE tooling.
  3. Implement the override one level higher, in spark-unified (which already depends on both V1 and V2), via an intermediate class.

Chosen. Option 3 — this class. It mirrors how DeltaCatalog.java already routes loadCatalogTable / loadPathTable between V1 and V2 from spark-unified. The only structural difference from the internal-source port is where the override is defined; the implementation logic is unchanged.

I'm not sure if I set myself to limitations that do not exist. When I see V1 vs V2 I always think of clear separation. Is that the case? Or could I simply have implemented stuff in the sparkV1 world?

vr.endingBoundInclusive())
case tr: TimestampRange =>
val catalogTable = resolveDeltaCatalogTable(ident)
val deltaLog = DeltaLog.forTable(spark, catalogTable)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct access to the delta log at this level of abstraction is a no-go imo.

buildChangelog is also doing a bit too much. For now, what you need is really just exposing the table schema. Later on surfacing containsCarryoverRows / ... may turn to be trickier and require more state to be loaded, but for now all the pre processing is a bit much.

A different structure will be simpler and delegate work to a later stage: have DeltaChangelog wrap a SparkTable (similar to https://docs.google.com/document/d/1NpJ9Q4wvU-tp3r67RC8MKnm-0J4Y0n7mSOC6iiiBJNo/edit?tab=t.0#heading=h.2gjhdgzfrm6g).
I would also just store the changelogInfo in DeltaChangelog, without trying to pre-process it. Making ChangelogSupport extension on the catalog really straightforward

We only support reading CDC here on DSv2 tables. That means SparkTable for Delta. You should reject reads on DeltaTableV2 below (which is a V1 table really, don't believe its name)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rejecting DeltaTableV2 reads with new Error DELTA_CHANGELOG_REQUIRES_V2_TABLE. Removed the direct DeltaLog access. Wrapping the SparkTable by deferring to the read path (DeltaChangelogScanBuilder.build and DeltaChangelogBatch.planInputPartitions)

Comment on lines +104 to +111
val engine = DefaultEngine.create(deltaLog.newDeltaHadoopConf())
val snapshotManager =
SnapshotManagerFactory.create(tablePath, engine, Optional.of(catalogTable))

// The latest snapshot is needed to default the ending version when the user did not
// specify one, and to surface a clear "start > latest" error before issuing a snapshot
// load that would otherwise fail with a low-level kernel error.
val latestVersion = snapshotManager.loadLatestSnapshot().getVersion()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll want to use the snapshotManager interface to resolve timestamps -> versions also, which is currently done by directly calling in DeltaLog.
E.g. startVersion probably through getActiveCommitAtTime.

Main point: you shouldn't create a snapshotManager yourself, and rely on the resolved SparkTable to do this. I'm not sure how well time-travel is implemented at this stage, so you may need to extend it.
E.g. we may want to start by resolving the SparkTable directly at startVersion / startTimestamp by calling loadTable(ident, version/timestamp)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no SnapshotManager is constructed inside the trait or DeltaChangelog. But I'm not using your approach through my own SnapshotManager (yet). I haven't yet switched to resolving the SparkTable directly at startVersion / startTimestamp via loadTable(ident, version/timestamp). The SparkTable resolved by the catalog gives us access to the snapshot manager for arbitrary version / timestamp lookups, so pinning the table to startVersion didn't seem necessary. I'll take a look at this on Monday.

Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/cdc/CdcReadMode.java Outdated
// CDCPartitionReaderFactory injects the CDC tail columns as constants instead.
Optional<CDCSchemaContext> cdcSchemaContext =
isCDCRead
cdcReadMode.injectsCdcAtReaderLevel()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on the comment on CdcReadMode: Consider Streaming write-time CDF a completely unrelated feature.
It's not possible to read both read-time CDF and write-time CDF in the same query, so it's not possible for the problem you describe to occur.

The important part here will be to make the distinction between the two features really clear to avoid confusion. This can simply be renaming isCDCRead to isWriteTimeCDCRead.

DeltaChangelogBatch will call createDeltaParquetReaderFactory passing isWriteTimeCDCRead=false

SanJSp and others added 3 commits May 16, 2026 06:32
Co-authored-by: Isaac
Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants